#!/usr/bin/env python

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Command line utlity for reading and writing Avro files."""

from avro.io import DatumReader, DatumWriter
from avro.datafile import DataFileReader, DataFileWriter
import avro.schema

try:
    import json
except ImportError:
    import simplejson as json
import csv
from sys import stdout, stdin
from itertools import ifilter, imap
from functools import partial
from os.path import splitext

class AvroError(Exception):
    pass

def print_json(row):
    print(json.dumps(row))

def print_json_pretty(row):
    print(json.dumps(row, indent=4))

_write_row = csv.writer(stdout).writerow
_encoding = stdout.encoding or "UTF-8"
def _encode(v, encoding=_encoding):
    if not isinstance(v, basestring):
        return v
    return v.encode(_encoding)

def print_csv(row):
    # We sort the keys to the fields will be in the same place
    # FIXME: Do we want to do it in schema order?
    _write_row([_encode(row[key]) for key in sorted(row)])

def select_printer(format):
    return {
        "json" : print_json,
        "json-pretty" : print_json_pretty,
        "csv" : print_csv
    }[format]

def record_match(expr, record):
    return eval(expr, None, {"r" : record})

def parse_fields(fields):
    fields = fields or ''
    if not fields.strip():
        return None

    return [field.strip() for field in fields.split(',') if field.strip()]

def field_selector(fields):
    fields = set(fields)
    def keys_filter(obj):
        return dict((k, obj[k]) for k in (set(obj) & fields))
    return keys_filter

def print_avro(avro, opts):
    if opts.header and (opts.format != "csv"):
        raise AvroError("--header applies only to CSV format")

    # Apply filter first
    if opts.filter:
        avro = ifilter(partial(record_match, opts.filter), avro)

    for i in xrange(opts.skip):
        try:
            next(avro)
        except StopIteration:
            return

    fields = parse_fields(opts.fields)
    if fields:
        avro = imap(field_selector(fields), avro)

    printer = select_printer(opts.format)
    for i, record in enumerate(avro):
        if i == 0 and opts.header:
            _write_row(sorted(record.keys()))
        if i >= opts.count:
            break
        printer(record)

def print_schema(avro):
    schema = avro.meta["avro.schema"]
    # Pretty print
    print json.dumps(json.loads(schema), indent=4)

def cat(opts, args):
    if not args:
        raise AvroError("No files to show")

    for filename in args:
        try:
            fo = open(filename, "rb")
        except (OSError, IOError), e:
            raise AvroError("Can't open %s - %s" % (filename, e))

        avro = DataFileReader(fo, DatumReader())

        if opts.print_schema:
            print_schema(avro)
            continue

        print_avro(avro, opts)

def _open(filename, mode):
    if filename == "-":
        return {
            "rb" : stdin,
            "wb" : stdout
        }[mode]

    return open(filename, mode)

def iter_json(info, _):
    return imap(json.loads, info)

def convert(value, field):
    type = field.type.type
    if type == "union":
        return convert_union(value, field)

    return  {
        "int" : int,
        "long" : long,
        "float" : float,
        "double" : float,
        "string" : str,
        "bytes" : str,
        "boolean" : bool,
        "null" : lambda _: None,
        "union" : lambda v: convert_union(v, field),
    }[type](value)

def convert_union(value, field):
    for name in [s.name for s in field.type.schemas]:
        try:
            return convert(name)(value)
        except ValueError:
            continue

def iter_csv(info, schema):
    header = [field.name for field in schema.fields]
    for row in csv.reader(info):
        values = [convert(v, f) for v, f in zip(row, schema.fields)]
        yield dict(zip(header, values))

def guess_input_type(files):
    if not files:
        return None

    ext = splitext(files[0])[1].lower()
    if ext in (".json", ".js"):
        return "json"
    elif ext in (".csv",):
        return "csv"

    return None

def write(opts, files):
    if not opts.schema:
        raise AvroError("No schema specified")

    input_type = opts.input_type or guess_input_type(files)
    if not input_type:
        raise AvroError("Can't guess input file type (not .json or .csv)")

    try:
        schema = avro.schema.parse(open(opts.schema, "rb").read())
        out = _open(opts.output, "wb")
    except (IOError, OSError), e:
        raise AvroError("Can't open file - %s" % e)

    writer = DataFileWriter(out, DatumWriter(), schema)

    iter_records = {"json" : iter_json, "csv" : iter_csv}[input_type]
    for filename in (files or ["-"]):
        info = _open(filename, "rb")
        for record in iter_records(info, schema):
            writer.append(record)

    writer.close()

def main(argv=None):
    import sys
    from optparse import OptionParser, OptionGroup

    argv = argv or sys.argv

    parser = OptionParser(description="Display/write for Avro files",
                      version="@AVRO_VERSION@",
                      usage="usage: %prog cat|write [options] FILE [FILE...]")
    # cat options

    cat_options = OptionGroup(parser, "cat options")
    cat_options.add_option("-n", "--count", default=float("Infinity"),
                    help="number of records to print", type=int)
    cat_options.add_option("-s", "--skip", help="number of records to skip",
                           type=int, default=0)
    cat_options.add_option("-f", "--format", help="record format",
                           default="json",
                           choices=["json", "csv", "json-pretty"])
    cat_options.add_option("--header", help="print CSV header", default=False,
                   action="store_true")
    cat_options.add_option("--filter", help="filter records (e.g. r['age']>1)",
                    default=None)
    cat_options.add_option("--print-schema", help="print schema",
                      action="store_true", default=False)
    cat_options.add_option('--fields', default=None,
                help='fields to show, comma separated (show all by default)')
    parser.add_option_group(cat_options)

    # write options
    write_options = OptionGroup(parser, "write options")
    write_options.add_option("--schema", help="schema file (required)")
    write_options.add_option("--input-type",
                             help="input file(s) type (json or csv)",
                             choices=["json", "csv"], default=None)
    write_options.add_option("-o", "--output", help="output file", default="-")
    parser.add_option_group(write_options)

    opts, args = parser.parse_args(argv[1:])
    if len(args) < 1:
        parser.error("You much specify `cat` or `write`")  # Will exit

    command = args.pop(0)
    try:
        if command == "cat":
            cat(opts, args)
        elif command == "write":
            write(opts, args)
        else:
            raise AvroError("Unknown command - %s" % command)
    except AvroError, e:
        parser.error("%s" % e) # Will exit
    except Exception, e:
        raise SystemExit("panic: %s" % e)

if __name__ == "__main__":
    main()

